1 /*
2 * Copyright (C) 2010 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.util.concurrent;
18
19 import com.google.common.collect.ObjectArrays;
20
21 import java.util.AbstractQueue;
22 import java.util.Collection;
23 import java.util.ConcurrentModificationException;
24 import java.util.Iterator;
25 import java.util.NoSuchElementException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.TimeUnit;
28
29 import javax.annotation.Nullable;
30
31 /**
32 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
33 * array. This queue orders elements FIFO (first-in-first-out). The
34 * <em>head</em> of the queue is that element that has been on the
35 * queue the longest time. The <em>tail</em> of the queue is that
36 * element that has been on the queue the shortest time. New elements
37 * are inserted at the tail of the queue, and the queue retrieval
38 * operations obtain elements at the head of the queue.
39 *
40 * <p>This is a classic "bounded buffer", in which a
41 * fixed-sized array holds elements inserted by producers and
42 * extracted by consumers. Once created, the capacity cannot be
43 * increased. Attempts to <tt>put</tt> an element into a full queue
44 * will result in the operation blocking; attempts to <tt>take</tt> an
45 * element from an empty queue will similarly block.
46 *
47 * <p> This class supports an optional fairness policy for ordering
48 * waiting producer and consumer threads. By default, this ordering
49 * is not guaranteed. However, a queue constructed with fairness set
50 * to <tt>true</tt> grants threads access in FIFO order. Fairness
51 * generally decreases throughput but reduces variability and avoids
52 * starvation.
53 *
54 * <p>This class and its iterator implement all of the
55 * <em>optional</em> methods of the {@link Collection} and {@link
56 * Iterator} interfaces.
57 *
58 * @author Doug Lea
59 * @author Justin T. Sampson
60 * @param <E> the type of elements held in this collection
61 */
62 public class MonitorBasedArrayBlockingQueue<E> extends AbstractQueue<E>
63 implements BlockingQueue<E> {
64
65 // Based on revision 1.58 of ArrayBlockingQueue by Doug Lea, from
66 // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
67
68 /** The queued items */
69 final E[] items;
70 /** items index for next take, poll or remove */
71 int takeIndex;
72 /** items index for next put, offer, or add. */
73 int putIndex;
74 /** Number of items in the queue */
75 private int count;
76
77 /*
78 * Concurrency control uses the classic two-condition algorithm
79 * found in any textbook.
80 */
81
82 /** Monitor guarding all access */
83 final Monitor monitor;
84
85 /** Guard for waiting takes */
86 private final Monitor.Guard notEmpty;
87
88 /** Guard for waiting puts */
89 private final Monitor.Guard notFull;
90
91 // Internal helper methods
92
93 /**
94 * Circularly increment i.
95 */
96 final int inc(int i) {
97 return (++i == items.length) ? 0 : i;
98 }
99
100 /**
101 * Inserts element at current put position, advances, and signals.
102 * Call only when occupying monitor.
103 */
104 private void insert(E x) {
105 items[putIndex] = x;
106 putIndex = inc(putIndex);
107 ++count;
108 }
109
110 /**
111 * Extracts element at current take position, advances, and signals.
112 * Call only when occupying monitor.
113 */
114 private E extract() {
115 final E[] items = this.items;
116 E x = items[takeIndex];
117 items[takeIndex] = null;
118 takeIndex = inc(takeIndex);
119 --count;
120 return x;
121 }
122
123 /**
124 * Utility for remove and iterator.remove: Delete item at position i.
125 * Call only when occupying monitor.
126 */
127 void removeAt(int i) {
128 final E[] items = this.items;
129 // if removing front item, just advance
130 if (i == takeIndex) {
131 items[takeIndex] = null;
132 takeIndex = inc(takeIndex);
133 } else {
134 // slide over all others up through putIndex.
135 for (;;) {
136 int nexti = inc(i);
137 if (nexti != putIndex) {
138 items[i] = items[nexti];
139 i = nexti;
140 } else {
141 items[i] = null;
142 putIndex = i;
143 break;
144 }
145 }
146 }
147 --count;
148 }
149
150 /**
151 * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
152 * capacity and default access policy.
153 *
154 * @param capacity the capacity of this queue
155 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
156 */
157 public MonitorBasedArrayBlockingQueue(int capacity) {
158 this(capacity, false);
159 }
160
161 /**
162 * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
163 * capacity and the specified access policy.
164 *
165 * @param capacity the capacity of this queue
166 * @param fair if <tt>true</tt> then queue accesses for threads blocked
167 * on insertion or removal, are processed in FIFO order;
168 * if <tt>false</tt> the access order is unspecified.
169 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
170 */
171 public MonitorBasedArrayBlockingQueue(int capacity, boolean fair) {
172 if (capacity <= 0)
173 throw new IllegalArgumentException();
174 this.items = newEArray(capacity);
175 monitor = new Monitor(fair);
176 notEmpty = new Monitor.Guard(monitor) {
177 @Override public boolean isSatisfied() {
178 return count > 0;
179 }
180 };
181 notFull = new Monitor.Guard(monitor) {
182 @Override public boolean isSatisfied() {
183 return count < items.length;
184 }
185 };
186 }
187
188 @SuppressWarnings("unchecked") // please don't try this home, kids
189 private static <E> E[] newEArray(int capacity) {
190 return (E[]) new Object[capacity];
191 }
192
193 /**
194 * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
195 * capacity, the specified access policy and initially containing the
196 * elements of the given collection,
197 * added in traversal order of the collection's iterator.
198 *
199 * @param capacity the capacity of this queue
200 * @param fair if <tt>true</tt> then queue accesses for threads blocked
201 * on insertion or removal, are processed in FIFO order;
202 * if <tt>false</tt> the access order is unspecified.
203 * @param c the collection of elements to initially contain
204 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
205 * <tt>c.size()</tt>, or less than 1.
206 * @throws NullPointerException if the specified collection or any
207 * of its elements are null
208 */
209 public MonitorBasedArrayBlockingQueue(int capacity, boolean fair,
210 Collection<? extends E> c) {
211 this(capacity, fair);
212 if (capacity < c.size())
213 throw new IllegalArgumentException();
214
215 for (E e : c)
216 add(e);
217 }
218
219 /**
220 * Inserts the specified element at the tail of this queue if it is
221 * possible to do so immediately without exceeding the queue's capacity,
222 * returning <tt>true</tt> upon success and throwing an
223 * <tt>IllegalStateException</tt> if this queue is full.
224 *
225 * @param e the element to add
226 * @return <tt>true</tt> (as specified by {@link Collection#add})
227 * @throws IllegalStateException if this queue is full
228 * @throws NullPointerException if the specified element is null
229 */
230 @Override public boolean add(E e) {
231 return super.add(e);
232 }
233
234 /**
235 * Inserts the specified element at the tail of this queue if it is
236 * possible to do so immediately without exceeding the queue's capacity,
237 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
238 * is full. This method is generally preferable to method {@link #add},
239 * which can fail to insert an element only by throwing an exception.
240 *
241 * @throws NullPointerException if the specified element is null
242 */
243 @Override
244 public boolean offer(E e) {
245 if (e == null) throw new NullPointerException();
246 final Monitor monitor = this.monitor;
247 if (monitor.enterIf(notFull)) {
248 try {
249 insert(e);
250 return true;
251 } finally {
252 monitor.leave();
253 }
254 } else {
255 return false;
256 }
257 }
258
259 /**
260 * Inserts the specified element at the tail of this queue, waiting
261 * for space to become available if the queue is full.
262 *
263 * @throws InterruptedException {@inheritDoc}
264 * @throws NullPointerException {@inheritDoc}
265 */
266 @Override
267 public void put(E e) throws InterruptedException {
268 if (e == null) throw new NullPointerException();
269 final Monitor monitor = this.monitor;
270 monitor.enterWhen(notFull);
271 try {
272 insert(e);
273 } finally {
274 monitor.leave();
275 }
276 }
277
278 /**
279 * Inserts the specified element at the tail of this queue, waiting
280 * up to the specified wait time for space to become available if
281 * the queue is full.
282 *
283 * @throws InterruptedException {@inheritDoc}
284 * @throws NullPointerException {@inheritDoc}
285 */
286 @Override
287 public boolean offer(E e, long timeout, TimeUnit unit)
288 throws InterruptedException {
289
290 if (e == null) throw new NullPointerException();
291 final Monitor monitor = this.monitor;
292 if (monitor.enterWhen(notFull, timeout, unit)) {
293 try {
294 insert(e);
295 return true;
296 } finally {
297 monitor.leave();
298 }
299 } else {
300 return false;
301 }
302 }
303
304 @Override
305 public E poll() {
306 final Monitor monitor = this.monitor;
307 if (monitor.enterIf(notEmpty)) {
308 try {
309 return extract();
310 } finally {
311 monitor.leave();
312 }
313 } else {
314 return null;
315 }
316 }
317
318 @Override
319 public E take() throws InterruptedException {
320 final Monitor monitor = this.monitor;
321 monitor.enterWhen(notEmpty);
322 try {
323 return extract();
324 } finally {
325 monitor.leave();
326 }
327 }
328
329 @Override
330 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
331 final Monitor monitor = this.monitor;
332 if (monitor.enterWhen(notEmpty, timeout, unit)) {
333 try {
334 return extract();
335 } finally {
336 monitor.leave();
337 }
338 } else {
339 return null;
340 }
341 }
342
343 @Override
344 public E peek() {
345 final Monitor monitor = this.monitor;
346 if (monitor.enterIf(notEmpty)) {
347 try {
348 return items[takeIndex];
349 } finally {
350 monitor.leave();
351 }
352 } else {
353 return null;
354 }
355 }
356
357 // this doc comment is overridden to remove the reference to collections
358 // greater in size than Integer.MAX_VALUE
359 /**
360 * Returns the number of elements in this queue.
361 *
362 * @return the number of elements in this queue
363 */
364 @Override public int size() {
365 final Monitor monitor = this.monitor;
366 monitor.enter();
367 try {
368 return count;
369 } finally {
370 monitor.leave();
371 }
372 }
373
374 // this doc comment is a modified copy of the inherited doc comment,
375 // without the reference to unlimited queues.
376 /**
377 * Returns the number of additional elements that this queue can ideally
378 * (in the absence of memory or resource constraints) accept without
379 * blocking. This is always equal to the initial capacity of this queue
380 * less the current <tt>size</tt> of this queue.
381 *
382 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
383 * an element will succeed by inspecting <tt>remainingCapacity</tt>
384 * because it may be the case that another thread is about to
385 * insert or remove an element.
386 */
387 @Override
388 public int remainingCapacity() {
389 final Monitor monitor = this.monitor;
390 monitor.enter();
391 try {
392 return items.length - count;
393 } finally {
394 monitor.leave();
395 }
396 }
397
398 /**
399 * Removes a single instance of the specified element from this queue,
400 * if it is present. More formally, removes an element <tt>e</tt> such
401 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
402 * elements.
403 * Returns <tt>true</tt> if this queue contained the specified element
404 * (or equivalently, if this queue changed as a result of the call).
405 *
406 * @param o element to be removed from this queue, if present
407 * @return <tt>true</tt> if this queue changed as a result of the call
408 */
409 @Override public boolean remove(@Nullable Object o) {
410 if (o == null) return false;
411 final E[] items = this.items;
412 final Monitor monitor = this.monitor;
413 monitor.enter();
414 try {
415 int i = takeIndex;
416 int k = 0;
417 for (;;) {
418 if (k++ >= count)
419 return false;
420 if (o.equals(items[i])) {
421 removeAt(i);
422 return true;
423 }
424 i = inc(i);
425 }
426 } finally {
427 monitor.leave();
428 }
429 }
430
431 /**
432 * Returns <tt>true</tt> if this queue contains the specified element.
433 * More formally, returns <tt>true</tt> if and only if this queue contains
434 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
435 *
436 * @param o object to be checked for containment in this queue
437 * @return <tt>true</tt> if this queue contains the specified element
438 */
439 @Override public boolean contains(@Nullable Object o) {
440 if (o == null) return false;
441 final E[] items = this.items;
442 final Monitor monitor = this.monitor;
443 monitor.enter();
444 try {
445 int i = takeIndex;
446 int k = 0;
447 while (k++ < count) {
448 if (o.equals(items[i]))
449 return true;
450 i = inc(i);
451 }
452 return false;
453 } finally {
454 monitor.leave();
455 }
456 }
457
458 /**
459 * Returns an array containing all of the elements in this queue, in
460 * proper sequence.
461 *
462 * <p>The returned array will be "safe" in that no references to it are
463 * maintained by this queue. (In other words, this method must allocate
464 * a new array). The caller is thus free to modify the returned array.
465 *
466 * <p>This method acts as bridge between array-based and collection-based
467 * APIs.
468 *
469 * @return an array containing all of the elements in this queue
470 */
471 @Override public Object[] toArray() {
472 final E[] items = this.items;
473 final Monitor monitor = this.monitor;
474 monitor.enter();
475 try {
476 Object[] a = new Object[count];
477 int k = 0;
478 int i = takeIndex;
479 while (k < count) {
480 a[k++] = items[i];
481 i = inc(i);
482 }
483 return a;
484 } finally {
485 monitor.leave();
486 }
487 }
488
489 /**
490 * Returns an array containing all of the elements in this queue, in
491 * proper sequence; the runtime type of the returned array is that of
492 * the specified array. If the queue fits in the specified array, it
493 * is returned therein. Otherwise, a new array is allocated with the
494 * runtime type of the specified array and the size of this queue.
495 *
496 * <p>If this queue fits in the specified array with room to spare
497 * (i.e., the array has more elements than this queue), the element in
498 * the array immediately following the end of the queue is set to
499 * <tt>null</tt>.
500 *
501 * <p>Like the {@link #toArray()} method, this method acts as bridge between
502 * array-based and collection-based APIs. Further, this method allows
503 * precise control over the runtime type of the output array, and may,
504 * under certain circumstances, be used to save allocation costs.
505 *
506 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
507 * The following code can be used to dump the queue into a newly
508 * allocated array of <tt>String</tt>:
509 *
510 * <pre>
511 * String[] y = x.toArray(new String[0]);</pre>
512 *
513 * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
514 * <tt>toArray()</tt>.
515 *
516 * @param a the array into which the elements of the queue are to
517 * be stored, if it is big enough; otherwise, a new array of the
518 * same runtime type is allocated for this purpose
519 * @return an array containing all of the elements in this queue
520 * @throws ArrayStoreException if the runtime type of the specified array
521 * is not a supertype of the runtime type of every element in
522 * this queue
523 * @throws NullPointerException if the specified array is null
524 */
525 @Override public <T> T[] toArray(T[] a) {
526 final E[] items = this.items;
527 final Monitor monitor = this.monitor;
528 monitor.enter();
529 try {
530 if (a.length < count)
531 a = ObjectArrays.newArray(a, count);
532
533 int k = 0;
534 int i = takeIndex;
535 while (k < count) {
536 // This cast is not itself safe, but the following statement
537 // will fail if the runtime type of items[i] is not assignable
538 // to the runtime type of a[k++], which is all that the method
539 // contract requires (see @throws ArrayStoreException above).
540 @SuppressWarnings("unchecked")
541 T t = (T) items[i];
542 a[k++] = t;
543 i = inc(i);
544 }
545 if (a.length > count)
546 a[count] = null;
547 return a;
548 } finally {
549 monitor.leave();
550 }
551 }
552
553 @Override public String toString() {
554 final Monitor monitor = this.monitor;
555 monitor.enter();
556 try {
557 return super.toString();
558 } finally {
559 monitor.leave();
560 }
561 }
562
563 /**
564 * Atomically removes all of the elements from this queue.
565 * The queue will be empty after this call returns.
566 */
567 @Override public void clear() {
568 final E[] items = this.items;
569 final Monitor monitor = this.monitor;
570 monitor.enter();
571 try {
572 int i = takeIndex;
573 int k = count;
574 while (k-- > 0) {
575 items[i] = null;
576 i = inc(i);
577 }
578 count = 0;
579 putIndex = 0;
580 takeIndex = 0;
581 } finally {
582 monitor.leave();
583 }
584 }
585
586 /**
587 * @throws UnsupportedOperationException {@inheritDoc}
588 * @throws ClassCastException {@inheritDoc}
589 * @throws NullPointerException {@inheritDoc}
590 * @throws IllegalArgumentException {@inheritDoc}
591 */
592 @Override
593 public int drainTo(Collection<? super E> c) {
594 if (c == null)
595 throw new NullPointerException();
596 if (c == this)
597 throw new IllegalArgumentException();
598 final E[] items = this.items;
599 final Monitor monitor = this.monitor;
600 monitor.enter();
601 try {
602 int i = takeIndex;
603 int n = 0;
604 int max = count;
605 while (n < max) {
606 c.add(items[i]);
607 items[i] = null;
608 i = inc(i);
609 ++n;
610 }
611 if (n > 0) {
612 count = 0;
613 putIndex = 0;
614 takeIndex = 0;
615 }
616 return n;
617 } finally {
618 monitor.leave();
619 }
620 }
621
622 /**
623 * @throws UnsupportedOperationException {@inheritDoc}
624 * @throws ClassCastException {@inheritDoc}
625 * @throws NullPointerException {@inheritDoc}
626 * @throws IllegalArgumentException {@inheritDoc}
627 */
628 @Override
629 public int drainTo(Collection<? super E> c, int maxElements) {
630 if (c == null)
631 throw new NullPointerException();
632 if (c == this)
633 throw new IllegalArgumentException();
634 if (maxElements <= 0)
635 return 0;
636 final E[] items = this.items;
637 final Monitor monitor = this.monitor;
638 monitor.enter();
639 try {
640 int i = takeIndex;
641 int n = 0;
642 int max = (maxElements < count) ? maxElements : count;
643 while (n < max) {
644 c.add(items[i]);
645 items[i] = null;
646 i = inc(i);
647 ++n;
648 }
649 if (n > 0) {
650 count -= n;
651 takeIndex = i;
652 }
653 return n;
654 } finally {
655 monitor.leave();
656 }
657 }
658
659 /**
660 * Returns an iterator over the elements in this queue in proper sequence.
661 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
662 * will never throw {@link ConcurrentModificationException},
663 * and guarantees to traverse elements as they existed upon
664 * construction of the iterator, and may (but is not guaranteed to)
665 * reflect any modifications subsequent to construction.
666 *
667 * @return an iterator over the elements in this queue in proper sequence
668 */
669 @Override public Iterator<E> iterator() {
670 final Monitor monitor = this.monitor;
671 monitor.enter();
672 try {
673 return new Itr();
674 } finally {
675 monitor.leave();
676 }
677 }
678
679 /**
680 * Iterator for MonitorBasedArrayBlockingQueue
681 */
682 private class Itr implements Iterator<E> {
683 /**
684 * Index of element to be returned by next,
685 * or a negative number if no such.
686 */
687 private int nextIndex;
688
689 /**
690 * nextItem holds on to item fields because once we claim
691 * that an element exists in hasNext(), we must return it in
692 * the following next() call even if it was in the process of
693 * being removed when hasNext() was called.
694 */
695 private E nextItem;
696
697 /**
698 * Index of element returned by most recent call to next.
699 * Reset to -1 if this element is deleted by a call to remove.
700 */
701 private int lastRet;
702
703 Itr() {
704 lastRet = -1;
705 if (count == 0)
706 nextIndex = -1;
707 else {
708 nextIndex = takeIndex;
709 nextItem = items[takeIndex];
710 }
711 }
712
713 @Override
714 public boolean hasNext() {
715 /*
716 * No sync. We can return true by mistake here
717 * only if this iterator passed across threads,
718 * which we don't support anyway.
719 */
720 return nextIndex >= 0;
721 }
722
723 /**
724 * Checks whether nextIndex is valid; if so setting nextItem.
725 * Stops iterator when either hits putIndex or sees null item.
726 */
727 private void checkNext() {
728 if (nextIndex == putIndex) {
729 nextIndex = -1;
730 nextItem = null;
731 } else {
732 nextItem = items[nextIndex];
733 if (nextItem == null)
734 nextIndex = -1;
735 }
736 }
737
738 @Override
739 public E next() {
740 final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
741 monitor.enter();
742 try {
743 if (nextIndex < 0)
744 throw new NoSuchElementException();
745 lastRet = nextIndex;
746 E x = nextItem;
747 nextIndex = inc(nextIndex);
748 checkNext();
749 return x;
750 } finally {
751 monitor.leave();
752 }
753 }
754
755 @Override
756 public void remove() {
757 final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
758 monitor.enter();
759 try {
760 int i = lastRet;
761 if (i == -1)
762 throw new IllegalStateException();
763 lastRet = -1;
764
765 int ti = takeIndex;
766 removeAt(i);
767 // back up cursor (reset to front if was first element)
768 nextIndex = (i == ti) ? takeIndex : i;
769 checkNext();
770 } finally {
771 monitor.leave();
772 }
773 }
774 }
775 }